package main

import (
	"context"
	"fmt"
	"gitee.com/zackeus/go-boot/rocketmq"
	"gitee.com/zackeus/go-boot/rocketmq/rlog"
	"gitee.com/zackeus/go-zero/core/logx"
	"os"
	"time"

	"gitee.com/zackeus/go-boot/rocketmq/consumer"
	"gitee.com/zackeus/go-boot/rocketmq/primitive"
)

const (
	Topic         = "test-delay-topic"
	ConsumerGroup = "test-delay-group"
	Endpoint      = "192.168.137.26:9876"
	AccessKey     = ""
	SecretKey     = ""
)

func main() {
	if err := logx.SetUp(logx.LogConf{Mode: "console", Encoding: "plain"}); err != nil {
		fmt.Println(err)
		return
	}
	rlog.SetLevel("error")

	sig := make(chan os.Signal)
	c, _ := rocketmq.NewPushConsumer(
		consumer.WithGroupName(ConsumerGroup),
		consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{Endpoint})),
	)

	if err := c.Subscribe(Topic, consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
		for _, msg := range msgs {
			t := time.Now().UnixNano()/int64(time.Millisecond) - msg.BornTimestamp
			fmt.Printf("Receive message[msgId=%s] %d ms later\n", msg.MsgId, t)
		}

		return consumer.ConsumeSuccess, nil
	}); err != nil {
		fmt.Println(err.Error())
	}

	// Note: start after subscribe
	if err := c.Start(); err != nil {
		fmt.Println(err.Error())
		os.Exit(-1)
	}

	<-sig
	if err := c.Shutdown(); err != nil {
		fmt.Printf("shutdown Consumer error: %s", err.Error())
	}
}
